/**
 * 
 */
package com.opensource.netty.redis.proxy.spring.schema.support;

import java.util.ArrayList;
import java.util.List;

import org.springframework.beans.factory.InitializingBean;

import com.alibaba.fastjson.JSONObject;
import com.opensource.netty.redis.proxy.commons.constants.RedisConstants;
import com.opensource.netty.redis.proxy.commons.utils.StringUtils;
import com.opensource.netty.redis.proxy.core.cluster.LoadBalance;
import com.opensource.netty.redis.proxy.core.config.LBRedisServerMasterCluster;
import com.opensource.netty.redis.proxy.core.config.RedisPoolConfig;
import com.opensource.netty.redis.proxy.core.config.support.LBRedisServerBean;
import com.opensource.netty.redis.proxy.core.config.support.LBRedisServerClusterBean;
import com.opensource.netty.redis.proxy.core.enums.RedisProxyParamType;
import com.opensource.netty.redis.proxy.core.enums.ZkNodeType;
import com.opensource.netty.redis.proxy.core.registry.Registry;
import com.opensource.netty.redis.proxy.core.registry.impl.support.ZkUtils;
import com.opensource.netty.redis.proxy.core.url.RedisProxyURL;
import com.opensource.netty.redis.proxy.net.server.LBRedisServer;
import com.opensource.netty.redis.proxy.zk.registry.ZookeeperRegistryFactory;
import com.opensource.netty.redis.proxy.zk.registry.listen.ZookeeperRegistryListen;

/**
 * redis proxy节点
 * @author liubing
 *
 */
public class RedisProxyNode implements InitializingBean {

	
	private List<RedisProxyMaster> redisProxyMasters;
	
	private String redisProxyHost;//主机名
	
	private int redisProxyPort;//端口号
	
	private String algorithmRef;
	
	private LoadBalance loadMasterBalance;//主的一致性算法
	
	private String zkAddress;//ZK地址
	/**
	 * 
	 */
	public RedisProxyNode() {
		super();
	}

	/**
	 * @return the redisProxyMasters
	 */
	public List<RedisProxyMaster> getRedisProxyMasters() {
		return redisProxyMasters;
	}

	/**
	 * @param redisProxyMasters the redisProxyMasters to set
	 */
	public void setRedisProxyMasters(List<RedisProxyMaster> redisProxyMasters) {
		this.redisProxyMasters = redisProxyMasters;
	}
	
	/**
	 * 获取 master-cluster
	 * @return
	 */
	private LBRedisServerMasterCluster getFfanRedisServerMasterCluster(){
		
		
		List<LBRedisServerClusterBean> redisServerClusterBeans=new ArrayList<LBRedisServerClusterBean>();
		if(redisProxyMasters!=null&&redisProxyMasters.size()>0){
			for(RedisProxyMaster redisProxyMaster:redisProxyMasters){
				LBRedisServerClusterBean ffanRedisServerClusterBean=new LBRedisServerClusterBean();
				LBRedisServerBean lbRedisServerBean=convertLBRedisServerBean(redisProxyMaster);
				ffanRedisServerClusterBean.setLoadClusterBalance(redisProxyMaster.getLoadClusterBalance());
				ffanRedisServerClusterBean.setRedisServerMasterBean(lbRedisServerBean);
				
				List<LBRedisServerBean> ffanRedisServerBeans=new ArrayList<LBRedisServerBean>();
				
				for(RedisProxySlave proxySlave:redisProxyMaster.getRedisProxyClusters()){
					LBRedisServerBean redisServerBean=convertSlaveLBRedisServerBean(proxySlave);
					ffanRedisServerBeans.add(redisServerBean);
					ffanRedisServerClusterBean.setRedisServerSlaveBeans(ffanRedisServerBeans);
				}
				redisServerClusterBeans.add(ffanRedisServerClusterBean);
			}
		}
		LBRedisServerMasterCluster ffanRedisServerMasterCluster=new LBRedisServerMasterCluster(redisServerClusterBeans);
		ffanRedisServerMasterCluster.setRedisProxyHost(redisProxyHost);
		ffanRedisServerMasterCluster.setRedisProxyPort(redisProxyPort);
		ffanRedisServerMasterCluster.setLoadMasterBalance(getLoadMasterBalance());
		return ffanRedisServerMasterCluster;

	}
	
	 /**
	   * 转换
	   * @param lbRedisServerBean
	   * @return
	   */
	  private LBRedisServerBean convertSlaveLBRedisServerBean(RedisProxySlave proxySlave){
		  LBRedisServerBean lbRedisServerBean=new LBRedisServerBean();
		  RedisPoolConfig redisPoolConfig=proxySlave.getRedisPoolConfig();
		  
		  lbRedisServerBean.setHost(proxySlave.getHost());
		  lbRedisServerBean.setPort(proxySlave.getPort());
		  lbRedisServerBean.setRedisPoolConfig(redisPoolConfig);
		  return lbRedisServerBean;
	  }
	
	 /**
	   * 转换
	   * @param lbRedisServerBean
	   * @return
	   */
	  private LBRedisServerBean convertLBRedisServerBean(RedisProxyMaster redisProxyMaster){
		  LBRedisServerBean lbRedisServerBean=new LBRedisServerBean();		  
		  RedisPoolConfig redisPoolConfig=redisProxyMaster.getRedisPoolConfig();
		  
		  lbRedisServerBean.setHost(redisProxyMaster.getHost());
		  lbRedisServerBean.setPort(redisProxyMaster.getPort());
		  lbRedisServerBean.setRedisPoolConfig(redisPoolConfig);
		  return lbRedisServerBean;
	  }
	
	/**
	 * 设置以后初始化
	 */
	@Override
	public void afterPropertiesSet() throws Exception {
		LBRedisServerMasterCluster ffanRedisServerMasterCluster=getFfanRedisServerMasterCluster();
		initRegistry(ffanRedisServerMasterCluster);
		startRedisProxyServer(ffanRedisServerMasterCluster);
	
	}
	
	/**
	 * 初始化zk
	 * @param ffanRedisServerMasterCluster
	 */
	private void initRegistry(LBRedisServerMasterCluster ffanRedisServerMasterCluster){
		ZookeeperRegistryFactory zookeeperRegistryFactory=new ZookeeperRegistryFactory();

		RedisProxyURL redisProxyURL=new RedisProxyURL();
		redisProxyURL.addParameter(RedisConstants.ADDRESS, this.getZkAddress());
		Registry registry=zookeeperRegistryFactory.getRegistry(redisProxyURL);
		ZookeeperRegistryListen registryListen=new ZookeeperRegistryListen(ffanRedisServerMasterCluster);
		if(ffanRedisServerMasterCluster.getMasters()!=null&&ffanRedisServerMasterCluster.getMasters().size()>0){
			for(LBRedisServerBean redisMasterServerBean:ffanRedisServerMasterCluster.getMasters()){
				RedisProxyURL newRedisProxyURL=new RedisProxyURL(redisMasterServerBean.getHost(),redisMasterServerBean.getPort(),redisMasterServerBean.getRedisPoolConfig().getTimeout());
				newRedisProxyURL.addParameter(RedisProxyParamType.REDISSERVER.getName(), JSONObject.toJSONString(redisMasterServerBean));				
				Boolean masterFlag=checkRedisValueFromZookeeper(newRedisProxyURL, redisMasterServerBean, registry);//检查主
				
				List<LBRedisServerBean> ffanRedisServerBeans=ffanRedisServerMasterCluster.getMasterFfanRedisServerBean(redisMasterServerBean.getKey());
				if(ffanRedisServerBeans!=null){
					for(LBRedisServerBean ffanClusterRedisServerBean:ffanRedisServerBeans){//从
						RedisProxyURL newClusterRedisProxyURL=new RedisProxyURL(ffanClusterRedisServerBean.getHost(),ffanClusterRedisServerBean.getPort(),ffanClusterRedisServerBean.getRedisPoolConfig().getTimeout());
						newClusterRedisProxyURL.setParentServerPath(redisMasterServerBean.getServerKey());
						Boolean flag=checkRedisValueFromZookeeper(newClusterRedisProxyURL, ffanClusterRedisServerBean, registry);
						if(!flag&&!masterFlag){//没有值
							JSONObject slaveJSONObject=new JSONObject();
							slaveJSONObject.put("host", ffanClusterRedisServerBean.getHost());
							slaveJSONObject.put("port", ffanClusterRedisServerBean.getPort());
							slaveJSONObject.put("weight", ffanClusterRedisServerBean.getWeight());//权重
							registry.createPersistent(newClusterRedisProxyURL, slaveJSONObject.toJSONString());//写入从
						}
						registry.registerSlave(newClusterRedisProxyURL, registryListen);
					}
				}
				
				if(!masterFlag){//没有值
					JSONObject masterJSONObject=new JSONObject();
					masterJSONObject.put("host", redisMasterServerBean.getHost());
					masterJSONObject.put("port", redisMasterServerBean.getPort());
					registry.createPersistent(newRedisProxyURL, masterJSONObject.toJSONString());//写入主
				}
				registry.register(newRedisProxyURL, registryListen);//系统初始化不监控主的变化，启动成功以后监控
			}
			
		}
		
	}
	
	
	/**
	 * 检查从zk是否有值
	 * @return
	 */
	private Boolean checkRedisValueFromZookeeper(RedisProxyURL url,LBRedisServerBean ffanSlaveRedisServerBean,Registry registry){
		 String nodeTypePath = ZkUtils.toNodePath(url,url.getParentServerPath(), ZkNodeType.AVAILABLE_SERVER);
		 if(registry.exist(nodeTypePath)){
			 String data=registry.readData(nodeTypePath, true);
			 if(StringUtils.isEmpty(data)){
				 return false;
			 }
			 JSONObject jsonObject=JSONObject.parseObject(data);
			 ffanSlaveRedisServerBean.setHost(jsonObject.getString("host"));
			 ffanSlaveRedisServerBean.setPort(jsonObject.getIntValue("port"));
			 return true;
		 }
		 return false;
	}
	
	/**
	 * 启动server 容器
	 */
	private void startRedisProxyServer(LBRedisServerMasterCluster ffanRedisServerMasterCluster){
		LBRedisServer ffanRedisServer=new LBRedisServer(ffanRedisServerMasterCluster);
		
		ffanRedisServer.start();
	}
	/**
	 * @return the redisProxyHost
	 */
	public String getRedisProxyHost() {
		return redisProxyHost;
	}

	/**
	 * @param redisProxyHost the redisProxyHost to set
	 */
	public void setRedisProxyHost(String redisProxyHost) {
		this.redisProxyHost = redisProxyHost;
	}

	/**
	 * @return the redisProxyPort
	 */
	public int getRedisProxyPort() {
		return redisProxyPort;
	}

	/**
	 * @param redisProxyPort the redisProxyPort to set
	 */
	public void setRedisProxyPort(int redisProxyPort) {
		this.redisProxyPort = redisProxyPort;
	}

	/**
	 * @return the loadMasterBalance
	 */
	public LoadBalance getLoadMasterBalance() {
		return loadMasterBalance;
	}

	/**
	 * @param loadMasterBalance the loadMasterBalance to set
	 */
	public void setLoadMasterBalance(LoadBalance loadMasterBalance) {
		this.loadMasterBalance = loadMasterBalance;
	}

	/**
	 * @return the algorithmRef
	 */
	public String getAlgorithmRef() {
		return algorithmRef;
	}

	/**
	 * @param algorithmRef the algorithmRef to set
	 */
	public void setAlgorithmRef(String algorithmRef) {
		this.algorithmRef = algorithmRef;
	}

	/**
	 * @return the zkAddress
	 */
	public String getZkAddress() {
		return zkAddress;
	}

	/**
	 * @param zkAddress the zkAddress to set
	 */
	public void setZkAddress(String zkAddress) {
		this.zkAddress = zkAddress;
	}

	
	
	
}
